package org.databandtech.cassandra;

import org.apache.flink.api.common.functions.FlatMapFunction;
import org.apache.flink.api.java.tuple.Tuple2;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.streaming.connectors.cassandra.CassandraSink;
import org.apache.flink.util.Collector;

/**
 CREATE KEYSPACE IF NOT EXISTS example
    WITH replication = {'class': 'SimpleStrategy', 'replication_factor': '1'};
CREATE TABLE IF NOT EXISTS example.wordcount (
    word text,
    count bigint,
    PRIMARY KEY(word)
    );
 *
 */
public class App 
{
    public static void main( String[] args )
    {
    	final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

    	DataStream<String> textDataStream = env.socketTextStream("127.0.0.1", 9999, "\n");
    	DataStream<Tuple2<String, Long>> result = textDataStream
    	        .flatMap(new FlatMapFunction<String, Tuple2<String, Long>>() {
					private static final long serialVersionUID = 1L;

					@Override
    	            public void flatMap(String value, Collector<Tuple2<String, Long>> out) {
    	                String[] words = value.toLowerCase().split("\\s");
    	                for (String word : words) {
    	                    if (!word.isEmpty()) {
    	                        out.collect(new Tuple2<String, Long>(word, 1L));
    	                    }
    	                }
    	            }
    	        })
    	        .keyBy(value -> value.f0)
    	        .timeWindow(Time.seconds(5))
    	        .sum(1);

    	try {
			CassandraSink.addSink(result)
			        .setQuery("INSERT INTO example.wordcount(word, count) values (?, ?);")
			        .setHost("127.0.0.1")
			        .build()
			        .name("cassandra Sink")
			        .disableChaining();
			
			env.execute("cassandra-sink");
		} catch (Exception e) {
			e.printStackTrace();
		}
    	
    }
}
